Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reposts with deletions/edits V3 #3706

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ad62cb5
Update schema/Makefile.
moodyjon Oct 5, 2022
87d747c
Generate schema/types/v2 with ModifyStream/StreamExt.
moodyjon Oct 17, 2022
0f7d951
Bump to protobuf=3.20.1 to match my protoc version.
moodyjon Oct 14, 2022
50f2d81
TEMP: Bump to hub with protobuf=3.20.1 version
moodyjon Oct 17, 2022
6eac8cd
Revise shchema to use protobuf.Any and update code.
moodyjon Oct 24, 2022
8645836
Link to specific hub branch.
moodyjon Oct 24, 2022
d696de1
Regenerate protobuf code at latest version.
moodyjon Oct 24, 2022
59bcb70
Fix use-before reference in apply(). Tweak logic so we can
moodyjon Oct 25, 2022
f12a7a2
Update schema for ext.StringMap adding repeated values.
moodyjon Nov 3, 2022
ce73f5d
Fix lint issues.
moodyjon Nov 3, 2022
2458499
Don't assume 'extensions' are present in Stream.
moodyjon Nov 3, 2022
1b18e48
Remove most of the supporting code for dynamic extensions.
moodyjon Nov 8, 2022
4adb0ce
Move away from using protobuf.Any in favor of StringMap as "oneof".
moodyjon Nov 10, 2022
ca4575d
Switch from StringMap -> protobuf.Struct to simplify.
moodyjon Nov 10, 2022
17d7aca
Cleanup some unneeded stuff.
moodyjon Nov 10, 2022
d9df6e0
Lint fix.
moodyjon Nov 10, 2022
3e2c11c
TEMP: workflow_dispatch
moodyjon Nov 10, 2022
b5a5520
MutableMapping -> Mapping as modifications are handled by merge().
moodyjon Nov 11, 2022
e0fa4b0
Update test to inlude claim search for extensions.
moodyjon Nov 15, 2022
a02b40c
Align lbry/schema with hub/schema regarding hex encoding/decoding.
moodyjon Nov 17, 2022
85a18ca
Install lbry-types as dependency (lbryio/types).
moodyjon Nov 30, 2022
fd4afeb
Correct path to wallet.json.
moodyjon Nov 30, 2022
4f1d82a
Fix setup.py syntax.
moodyjon Dec 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: ci
on: ["push", "pull_request"]
on: ["push", "pull_request", "workflow_dispatch"]

jobs:

Expand Down
15 changes: 13 additions & 2 deletions lbry/extras/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3357,8 +3357,13 @@ async def jsonrpc_stream_repost(
# TODO: use error from lbry.error
raise Exception('Invalid claim id. It is expected to be a 40 characters long hexadecimal string.')

reposted_txo = await self.ledger.get_claim_by_claim_id(claim_id, include_is_my_output=True)
if not isinstance(reposted_txo, Output) or not reposted_txo.is_claim:
raise InputValueError(f"Could not find claim '{claim_id}'.")
if not reposted_txo.can_decode_claim:
raise InputValueError(f"A claim with id '{claim_id}' was found but could not be decoded.")
claim = Claim()
claim.repost.update(**kwargs)
claim.repost.update(claim_type=reposted_txo.claim.claim_type, **kwargs)
claim.repost.reference.claim_id = claim_id
tx = await Transaction.claim_create(
name, claim, amount, claim_address, funding_accounts, funding_accounts[0], channel
Expand Down Expand Up @@ -3740,7 +3745,13 @@ async def jsonrpc_stream_update(
if old_txo.claim.is_stream:
claim.stream.update(file_path=file_path, **kwargs)
elif old_txo.claim.is_repost:
claim.repost.update(**kwargs)
reposted_id = old_txo.claim.repost.reference.claim_id
reposted_txo = await self.ledger.get_claim_by_claim_id(reposted_id, include_is_my_output=True)
if not isinstance(reposted_txo, Output) or not reposted_txo.is_claim:
raise InputValueError(f"Could not find reposted claim '{reposted_id}'.")
if not reposted_txo.can_decode_claim:
raise InputValueError(f"A claim with id '{reposted_id}' was found but could not be decoded.")
claim.repost.update(claim_type=reposted_txo.claim.claim_type, **kwargs)

if clear_channel:
claim.clear_signature()
Expand Down
4 changes: 2 additions & 2 deletions lbry/schema/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
build:
rm types/v2/* -rf
rm -rf types/v2/*
touch types/v2/__init__.py
cd types/v2/ && protoc --python_out=. -I ../../../../../types/v2/proto/ ../../../../../types/v2/proto/*.proto
cd types/v2/ && cp ../../../../../types/jsonschema/* ./
sed -e 's/^import\ \(.*\)_pb2\ /from . import\ \1_pb2\ /g' -i types/v2/*.py
sed -e 's/^import\ \(.*\)_pb2\ /from . import\ \1_pb2\ /g' -i.bak types/v2/*.py
277 changes: 263 additions & 14 deletions lbry/schema/attrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,28 @@
import logging
import os.path
import hashlib
from collections.abc import Mapping, Iterable
from typing import Tuple, List
from string import ascii_letters
from decimal import Decimal, ROUND_UP
from binascii import hexlify, unhexlify
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToDict, ParseDict, ParseError

from lbry.crypto.base58 import Base58
from lbry.constants import COIN
from lbry.error import MissingPublishedFileError, EmptyPublishedFileError

import lbry.schema.claim as claim
from lbry.schema.mime_types import guess_media_type
from lbry.schema.base import Metadata, BaseMessageList
from lbry.schema.tags import clean_tags, normalize_tag
from lbry.schema.types.v2.claim_pb2 import (
from lbry.schema.tags import normalize_tag
from google.protobuf.message import Message as ProtobufMessage
from lbry_types.v2.claim_pb2 import (
Claim as ClaimMessage,
Fee as FeeMessage,
Location as LocationMessage,
Language as LanguageMessage
Language as LanguageMessage,
)

from lbry_types.v2.extension_pb2 import Extension as ExtensionMessage

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -173,11 +176,11 @@ def media_type(self, media_type: str):

@property
def file_hash(self) -> str:
return hexlify(self.message.hash).decode()
return self.message.hash.hex()

@file_hash.setter
def file_hash(self, file_hash: str):
self.message.hash = unhexlify(file_hash.encode())
self.message.hash = bytes.fromhex(file_hash)

@property
def file_hash_bytes(self) -> bytes:
Expand All @@ -189,11 +192,11 @@ def file_hash_bytes(self, file_hash_bytes: bytes):

@property
def sd_hash(self) -> str:
return hexlify(self.message.sd_hash).decode()
return self.message.sd_hash.hex()

@sd_hash.setter
def sd_hash(self, sd_hash: str):
self.message.sd_hash = unhexlify(sd_hash.encode())
self.message.sd_hash = bytes.fromhex(sd_hash)

@property
def sd_hash_bytes(self) -> bytes:
Expand All @@ -205,11 +208,11 @@ def sd_hash_bytes(self, sd_hash: bytes):

@property
def bt_infohash(self) -> str:
return hexlify(self.message.bt_infohash).decode()
return self.message.bt_infohash.hex()

@bt_infohash.setter
def bt_infohash(self, bt_infohash: str):
self.message.bt_infohash = unhexlify(bt_infohash.encode())
self.message.bt_infohash = bytes.fromhex(bt_infohash)

@property
def bt_infohash_bytes(self) -> bytes:
Expand Down Expand Up @@ -355,11 +358,11 @@ class ClaimReference(Metadata):

@property
def claim_id(self) -> str:
return hexlify(self.claim_hash[::-1]).decode()
return self.claim_hash[::-1].hex()

@claim_id.setter
def claim_id(self, claim_id: str):
self.claim_hash = unhexlify(claim_id)[::-1]
self.claim_hash = bytes.fromhex(claim_id)[::-1]

@property
def claim_hash(self) -> bytes:
Expand All @@ -369,6 +372,86 @@ def claim_hash(self) -> bytes:
def claim_hash(self, claim_hash: bytes):
self.message.claim_hash = claim_hash

class ModifyingClaimReference(ClaimReference):

__slots__ = ()

@property
def modification_type(self) -> str:
return self.message.WhichOneof('type')

@modification_type.setter
def modification_type(self, claim_type: str):
"""Select the appropriate member (stream, channel, repost, or collection)"""
old_type = self.message.WhichOneof('type')
if old_type == claim_type:
return
if old_type and claim_type is None:
self.message.ClearField(old_type)
return
member = getattr(self.message, claim_type)
member.SetInParent()

def update(self, claim_type: str, **kwargs) -> dict:
"""
Store updates to modifiable fields in deletions/edits.
Currently, only the "extensions" field (StreamExtensionMap)
of a stream claim may be modified. Returns a dict containing
the unhandled portion of "kwargs".
"""
if claim_type != 'stream':
return kwargs

clr_exts = kwargs.pop('clear_extensions', None)
set_exts = kwargs.pop('extensions', None)
if clr_exts is None and set_exts is None:
return kwargs

self.modification_type = claim_type
if not self.modification_type == 'stream':
return kwargs

mods = getattr(self.message, self.modification_type)

if clr_exts is not None:
deletions = StreamModifiable(mods.deletions)
if isinstance(clr_exts, str) and clr_exts.startswith('{'):
clr_exts = json.loads(clr_exts)
deletions.extensions.merge(clr_exts)

if set_exts is not None:
edits = StreamModifiable(mods.edits)
if isinstance(set_exts, str) and set_exts.startswith('{'):
set_exts = json.loads(set_exts)
edits.extensions.merge(set_exts)

return kwargs

def apply(self, reposted: 'claim.Claim') -> 'claim.Claim':
"""
Given a reposted claim, apply the stored deletions/edits, and return
the modified claim. Returns the original claim if the claim type has
changed such that the modifications are not relevant.
"""
if not self.modification_type or self.modification_type != reposted.claim_type:
return reposted
if not reposted.claim_type == 'stream':
return reposted

m = ClaimMessage()
m.CopyFrom(reposted.message)
result = claim.Claim(m)

# only stream claims, and only stream extensions are handled
stream = getattr(result, result.claim_type)
exts = getattr(stream, 'extensions')

mods = getattr(self.message, self.modification_type)
# apply deletions
exts.merge(StreamModifiable(mods.deletions).extensions, delete=True)
# apply edits
exts.merge(StreamModifiable(mods.edits).extensions)
return result

class ClaimList(BaseMessageList[ClaimReference]):

Expand Down Expand Up @@ -569,3 +652,169 @@ def append(self, tag: str):
tag = normalize_tag(tag)
if tag and tag not in self.message:
self.message.append(tag)

class StreamExtension(Metadata):
__slots__ = Metadata.__slots__ + ('extension_schema',)

def __init__(self, schema, message):
super().__init__(message)
self.extension_schema = schema

def to_dict(self, include_schema=True):
attrs = self.unpacked.to_dict()
return { f'{self.schema}': attrs } if include_schema else attrs

def from_value(self, value):
schema = self.schema

# If incoming is an extension, we have an Extension message.
if isinstance(value, StreamExtension):
schema = value.schema or schema

# Translate str -> (JSON) dict.
if isinstance(value, str) and value.startswith('{'):
value = json.loads(value)

# Check for 1-element dictionary at top level: {<schema>: <attrs>}.
if isinstance(value, dict) and len(value) == 1:
k = next(iter(value.keys()))
if self.schema is None or self.schema == k:
# Schema is determined. Extract dict containining attrs.
schema = k
value = value[schema]

# Try to decode attrs dict -> Extension message containing protobuf.Struct.
if isinstance(value, dict):
try:
ext = StreamExtension(schema, ExtensionMessage())
ParseDict(value, ext.message.struct)
value = ext
except ParseError:
pass

# Either we have an Extension message or decoding failed.
if isinstance(value, StreamExtension):
self.extension_schema = value.schema or schema
self.message.CopyFrom(value.message)
else:
log.info('Could not parse StreamExtension value: %s type: %s', value, type(value))
raise ValueError(f'Could not parse StreamExtension value: {value}')

@property
def schema(self):
return self.extension_schema

@property
def unpacked(self):
return Struct(self.message.struct)

def merge(self, ext: 'StreamExtension', delete: bool = False) -> 'StreamExtension':
self.unpacked.merge(ext.unpacked, delete=delete)
return self

class Struct(Metadata, Mapping, Iterable):
__slots__ = ()

def to_dict(self) -> dict:
return MessageToDict(self.message)

def merge(self, other: 'Struct', delete: bool = False) -> 'Struct':
for k, v in other.message.fields.items():
if k not in self.message.fields:
if not delete:
self.message.fields[k].CopyFrom(v)
continue
my_value = self.message.fields[k]
my_kind = my_value.WhichOneof('kind')
kind = v.WhichOneof('kind')
if kind != my_kind:
continue
if kind == 'struct_value':
if len(v.struct_value.fields) > 0:
Struct(my_value).merge(v.struct_value, delete=delete)
elif delete:
del self.message.fields[k]
elif kind == 'list_value':
if len(v.list_value.values) > 0:
for _, o in enumerate(v.list_value.values):
for i, v in enumerate(my_value.list_value.values):
if v == o:
if delete:
del my_value.list_value.values[i]
break
if not delete:
if isinstance(o, ProtobufMessage):
my_value.list_value.values.add().CopyFrom(o)
else:
my_value.list_value.values.append(o)
elif delete:
del self.message.fields[k]
elif getattr(my_value, my_kind) == getattr(v, kind):
del self.message.fields[k]
return self

def __getitem__(self, key):
def extract(val):
if not isinstance(val, ProtobufMessage):
return val
kind = val.WhichOneof('kind')
if kind == 'struct_value':
return dict(Struct(val.struct_value))
elif kind == 'list_value':
return list(map(extract, val.list_value.values))
else:
return getattr(val, kind)
if key in self.message.fields:
val = self.message.fields[key]
return extract(val)
raise KeyError(key)

def __iter__(self):
return iter(self.message.fields)

def __len__(self):
return len(self.message.fields)

class StreamExtensionMap(Metadata, Mapping, Iterable):
__slots__ = ()
item_class = StreamExtension

def to_dict(self):
return { k: v.to_dict(include_schema=False) for k, v in self.items() }

def merge(self, exts, delete: bool = False) -> 'StreamExtensionMap':
if isinstance(exts, StreamExtension):
exts = {exts.schema: exts}
if isinstance(exts, str) and exts.startswith('{'):
exts = json.loads(exts)
for schema, ext in exts.items():
obj = StreamExtension(schema, ExtensionMessage())
if isinstance(ext, StreamExtension):
obj.from_value(ext)
else:
obj.from_value({schema: ext})
if delete and not len(obj.unpacked):
del self.message[schema]
continue
existing = StreamExtension(schema, self.message[schema])
existing.merge(obj, delete=delete)
return self

def __getitem__(self, key):
if key in self.message:
return StreamExtension(key, self.message[key])
raise KeyError(key)

def __iter__(self):
return iter(self.message)

def __len__(self):
return len(self.message)


class StreamModifiable(Metadata):
__slots__ = ()

@property
def extensions(self) -> StreamExtensionMap:
return StreamExtensionMap(self.message.extensions)
Loading