In [1]:
from datetime import datetime
from typing import Dict, List, Optional, Union
from k11.models.models import SourceMap, Format, ContentType, LinkStore, ContainerFormat, XMLContainerFormat

from secrets import token_urlsafe
import pandas as pd
from urllib.parse import urlparse
from k11.vault import connection_handler
from mongoengine import Q

connection_handler.mount_mongo_engines()


In [2]:
def create_map( name: str, home_link: str, formatter: str,links: List[Dict] = [], assumed_tags: str = '',
                     compulsory_tags: List[str] = [], is_rss: bool = False, is_collection: bool = True,
                     watermarks: List[str] = [], source_id: str = None, datetime_format: str="", is_third_party: bool = False,
                     source_locations=[]
                      ):
                      sm =  SourceMap(source_name=name, source_home_link=home_link,
                      source_id=token_urlsafe(16) + "_"+ name.lower() if source_id is None else source_id, 
                      formatter=formatter, assumed_tags=assumed_tags.strip(), _links=links,
                      compulsory_tags=compulsory_tags, is_collection=is_collection,
                      is_rss=is_rss, watermarks=watermarks,
                      datetime_format=datetime_format, is_third_party=is_third_party,
                      source_locations=source_locations
                      )
                      sm.save()
                      return sm

In [3]:
def tag_formatter(tag:str) -> Union[str, None]:
        return tag.replace("/",".").replace(" and ", " ")

In [4]:
def get_collection_selector(name, extras = [], defaults= {}):
    _defaults = {"sel": "xpath", "param": "text()", "type": "text", "parent": name, "is_multiple": False, "is_cdata": False}
    _defaults.update(defaults)
    data = {}
    for key in ['sel', 'param', 'parent', 'type', 'is_multiple', "is_cdata"] + extras:
        value = input(f"Please enter {key} value for {name}: ")
        if value == "" or value == " " and key in _defaults:
            data[key] = _defaults[key]
        else:
            data[key] = value
            if isinstance(_defaults[key], bool):
                data[key] = bool(value)
    return data


In [5]:
def is_quit_param(txt):
    return txt.lower() == "q" or txt == "" or txt == " " or txt.lower() == "quit"

In [6]:
def _create_collection_format():
    data = {}
    # is_xml = input('Enter x for xml_collection_format or h for html_collection_format: ').lower() == 'x'
    itertag = input('Enter itertag for this selector: ')
    if itertag != '' or itertag != " ":
        data['itertag'] = itertag
    data['title'] = get_collection_selector('title')
    data['link'] = get_collection_selector('link')
    data['creator'] = get_collection_selector('creator')
    while True:
        sec_act = input("Enter name for new selector or type 'q' quit for exiting: ")
        if is_quit_param(sec_act):
            break
        else:
            data[sec_act] = get_collection_selector(sec_act)
    return data




In [7]:
def create_collection_format(name, format_):
    data = _create_collection_format()
#     Format.objects(format_id=format_.format_id).update_one(**{f"set__{name}": data})
    setattr(format_, name, data)
    format_.save()
    return format_.reload()


In [8]:
def get_container_identity():
    data = {}
    data['param'] = input("Enter param for container identity: ")
    data['is_multiple'] = input("Is this identity for multiple items y/n: ").lower() == "y"
    data['content_type'] = input('Enter content-type for the identity a for article, i for image, v for video: ')
    cmap = {'a': 'article', 'i': 'image', 'v': 'video'}
    data['content_type'] = cmap[data['content_type']] if data['content_type'] != "" else cmap['a']
    data['is_bakeable'] = input('Is this identity contains multiple articles y/n: ').lower() == "y"
    title_selectors = []
    while True:
        action = input('Enter title selector or "q" for quit: ')
        if is_quit_param(action):
            break
        else:
            title_selectors.append(action)
    creator_selectors = []
    while True:
        action = input('Enter creator selector or "q" for quit: ')
        if is_quit_param(action):
            break
        else:
            creator_selectors.append(action)
    body_selectors = []
    while True:
        action = input('Enter body selector or "q" for quit: ')
        if is_quit_param(action):
            break
        else:
            body_selectors.append(action)
    return data, title_selectors, creator_selectors, body_selectors

In [9]:
def get_query_selector():
    data = {}
    for key in ['tag', 'id', 'class_list', 'exact_class']:
        value = input(f'Enter {key} for query: ')
        if key == "class_list" and len(value) > 0:
            data[key] = value.split(" ")
        elif len(value) > 0 and value != " ":
            data[key] = value
    return data


In [10]:
def _create_container_format():
    data = {}
    data['idens'] = []
    data['title_selectors'] = []
    data['creator_selectors'] = []
    data['body_selectors'] = []
    data['ignorables'] = []
    while True:
        action = input('Enter a for adding new identity, q for quit: ')
        if is_quit_param(action):
            break
        iden, title_selectors, creator_selectors, body_selectors = get_container_identity()
        data['title_selectors'] += title_selectors
        data['creator_selectors'] += creator_selectors
        data['body_selectors'] += body_selectors
        while True:
            action = input('Enter a for adding new ignorable, q for quit: ')
            if is_quit_param(action):
                break
            else:
                data['ignorables'].append(get_query_selector())
        data['idens'].append(iden)
    data['title_selectors'] = list(set(data['title_selectors']))
    data['creator_selectors'] = list(set(data['creator_selectors']))
    data['body_selectors'] = list(set(data['body_selectors']))
    return data
    



In [11]:
def create_html_container_format(format_):
    data = _create_container_format()
    format_._html_article_format = data
    format_.save()
    return format_



In [12]:
def create_extra_format(format_):
    extra_format = {}
    if hasattr(format_, "extra_formats") and format_.extra_formats is not None:
        extra_format = format_.extra_formats
    while True:
        action = input('Enter c for adding new collection format and a for adding article format, q for quit: ')
        if is_quit_param(action):
            break
        else:
            name = input("Enter name for the format: ")
            if is_quit_param(name):
                break
            if action.lower() == "c":
                data = _create_collection_format()
                extra_format[name] = data
            elif action.lower() == "a":
                data = _create_container_format()
                extra_format[name] = data
#     Format.objects(format_id=format_.format_id).update_one(extra_formats= extra_format)
    format_.extra_formats = extra_format
    format_.save()
    return format_.reload()

                


In [13]:
def create_xml_container_format(format_):
    data = {}
    ct = input('Enter content-type for format a for article, i for image, v for video: ')
    cmap = {"a": "article", "i": "image", "v": "video"}
    data['content_type'] = cmap[ct] if ct != "" or ct != " " else cmap['a']
    data['struct']= _create_container_format()
#     Format.objects(format_id=format_.format_id).update_one(xml_article_format=data)
    format_._xml_article_format = data
    format_.save()
    return format_.reload()


In [14]:
def create_format( **kwargs ):
    fm = Format(**kwargs)
    fm.save()
    return fm

In [15]:
# create_format(source_name="asnfjd", format_id="sadfhjksgh", source_home_link="dsfjhkfjdg", 
# xml_article_format = XMLContainerFormat(struct=None, content_type=None),
# html_article_format = ContainerFormat(idens=[]) )

In [16]:
def print_all_formats(format_):
    keys = ['xml_collection_format', 'html_collection_format', 'html_article_format', 'xml_article_format']
    format_keys= [] 
    for key in keys:
        if hasattr(format_, key) and getattr(format_, key) is not None:
            format_keys.append(key)
    if hasattr(format_, "extra_formats") and format_.extra_formats is not None:
        format_keys += list(format_.extra_formats.keys())
    for index, key in enumerate(format_keys):
        print(index, ". ", key,"\n")
    print("\n"*2)
    return format_keys


In [17]:
def interactive_format_prompt(source_name, source_id, source_home_link, formatter = None):
    formats = Format.objects(source_home_link= source_home_link)
    if formats.count() == 0:
        format_ = create_format(source_name=source_name, format_id=source_id, source_home_link=source_home_link)
    else:
        format_ = formats.get()
    while True:
        keys = print_all_formats(format_)
        kmap = {'xml_collection_format': 'xc', 'html_collection_format': 'hc', 'html_article_format': 'ha', 'xml_article_format': 'xa', 'extra_formats': 'e'}
        prompt = "Please press "
        for key in kmap.keys():
            if key not in keys:
                prompt += f"{kmap[key]} for {key}, "
        prompt +=", 'q' for quit: "
        action = input(prompt)
        if is_quit_param(action):
            break
        elif action.lower() == 'xc':
            format_ = create_collection_format('xml_collection_format', format_)
        elif action.lower() == 'hc':
            format_ = create_collection_format('html_collection_format', format_)
        elif action.lower() == 'ha':
            format_ = create_html_container_format(format_)
        elif action.lower() == 'xa':
            format_ = create_xml_container_format(format_)
        elif action.lower() == 'e':
            format_ = create_extra_format(format_)
    keys = print_all_formats(format_)
    action = input("Enter index number for selecting the formatter, or q for quit: ")
    ask_for_default_formatter = input('Do you want to set this formatter as default for source y/n: ').lower() == "y"
    if ( not is_quit_param(action)) and action.isdigit():
        formatter = keys[int(action)]        
    return formatter, ask_for_default_formatter

    
    
    
    
        

    
    


In [18]:
def _create_source_map(category, source_name, link, rss = 'FALSE', third_party_rss=False, wild_card_link:bool = False):
    is_rss = rss is not None and len(rss) > 0 and rss.upper() != "FALSE"
    url = rss if is_rss else link
    assumed_tags=  tag_formatter(category)
    compulsory_tags = input("Enter compulsory tags for \n" + url)
    compulsory_tags = tag_formatter(compulsory_tags)
    if compulsory_tags == "q":
        return None
    compulsory_tags = compulsory_tags.split(" ")
    watermarks = input("Enter watermarks for \n" + url).split(" ")
    content_type = input("Content type of this url a for articles, i for images and v for videos: ")
    is_link_multiple = input("is link contains articles directly y/n: ").lower() == "y"
    if content_type == "i":
        content_type = ContentType.Image
    elif content_type == "v":
        content_type = ContentType.Video
    else:
        content_type = ContentType.Article
    home_url = link if is_rss and third_party_rss else url
    url_parse = urlparse(home_url)
    source_home_link = url_parse.scheme +"://" + url_parse.netloc
    source_maps = SourceMap.objects(Q(source_home_link=source_home_link) & Q(is_rss=is_rss))
    if source_maps.count() > 0:
        source_map = source_maps.first()
    else:
        source_map = None

    source_map_compulsory_tags = source_map.compulsory_tags if source_map is not None and source_map.compulsory_tags is not None else []
    source_map_assumed_tags = source_map.assumed_tags if source_map is not None and source_map.assumed_tags is not None else ""
    source_map_assumed_tags += " " + assumed_tags
    source_map_watermarks = source_map.watermarks if source_map is not None and source_map.watermarks is not None else []
    source_map_watermarks += watermarks

    if len(compulsory_tags) > 0:
            compulsory_tags_action = input("Enter a for append, r for replace and l for leave: ")
            if compulsory_tags_action.lower() == "a":
                source_map_compulsory_tags += compulsory_tags
                source_map_compulsory_tags = list(set(source_map_compulsory_tags))
            elif compulsory_tags_action.lower() == "r":
                source_map_compulsory_tags = compulsory_tags
    if source_map is None:
            is_collection = input("Is the source a collection y/n: ").lower() == "y"
            source_locs = input("Enter source locations. ")
            source_map = create_map(name=source_name, source_id=token_urlsafe(16)+ "_" + source_name.lower(),
                                        formatter=None,assumed_tags=source_map_assumed_tags, compulsory_tags=source_map_compulsory_tags,
                                        home_link=source_home_link, watermarks=source_map_watermarks, is_rss=is_rss,is_collection=is_collection,
                                        source_locations=source_locs.split(" ")            
            )
    formatter, is_update_default_formatter = interactive_format_prompt(source_map.source_name, source_map.source_id, source_map.source_home_link, formatter=source_map.formatter)
    if is_update_default_formatter:
#         SourceMap.objects(source_id=source_map.source_id).update_one(set__formatter= formatter)
        source_map.formatter = formatter
    if source_map.links is None:
        source_map._links = []
    source_map._links.append(LinkStore(link=url, assumed_tags=assumed_tags,
                                     compulsory_tags=compulsory_tags if len(compulsory_tags) == 0 else None,
                                     content_type=content_type,
                                     is_multiple=is_link_multiple,
                                     formatter=formatter))
    source_map._links=[link.to_dict() for link in source_map.links]
    source_map.compulsory_tags=list(set(source_map_compulsory_tags))
    source_map.save()
    return source_map.reload() 

    
    
    

In [19]:
raise  Exception()

Exception: 

In [20]:
# category = "womens_fashion celebirity_fashion__contest"
category = "space_technology__astronomy"
source_name="Sky And Telescope"
link = "https://www.vogue.in/horoscope/"
rss="https://skyandtelescope.org/astronomy-news/feed/"
third_party = False

In [None]:
_create_source_map(category=category, source_name=source_name, link=link, rss=rss
                   , third_party_rss=third_party)

Enter compulsory tags for 
https://skyandtelescope.org/astronomy-news/feed/
Enter watermarks for 
https://skyandtelescope.org/astronomy-news/feed/
Content type of this url a for articles, i for images and v for videos: a
is link contains articles directly y/n: n
Enter a for append, r for replace and l for leave: r
Is the source a collection y/n: y
Enter source locations. US
0 .  xml_collection_format 

1 .  html_collection_format 




Please press ha for html_article_format, xa for xml_article_format, e for extra_formats, , 'q' for quit: xc
Enter itertag for this selector: item
Please enter sel value for title: 
Please enter param value for title: 
Please enter parent value for title: 
Please enter type value for title: 
Please enter is_multiple value for title: 
Please enter is_cdata value for title: 
Please enter sel value for link: 
Please enter param value for link: 
Please enter parent value for link: 
Please enter type value for link: 
Please enter is_multiple value for link: 
Pl

In [None]:
raise Exception()

In [117]:
def create_youtube_source(link, category):
    url = urlparse(link)
    channel_id = url.path.replace("/channel","").replace("/videos", "").replace("/","")
    youtube_sources = SourceMap.objects(Q(source_id= "youtube") &Q(is_third_party= True))
    if not (youtube_sources.count() > 0):
        youtube_source = SourceMap.objects.insert([SourceMap(source_name="YouTube", source_id="youtube", source_home_link="https://www.youtube.com/", is_third_party=True, is_collection=False, is_rss=False, links=[], formatter=None, assumed_tags="", compulsory_tags=[])])
    else:
        youtube_source = youtube_sources.get()
    assumed_tags = category
    compulsory_tags = input("Enter compulsory tags ")
    if compulsory_tags == "":
        compulsory_tags = []
    else:
        compulsory_tags = compulsory_tags.split(" ")
    ctag_action = input("Press a for append, r for replace and l for leave, for action on compulsory_tags")
    if ctag_action.lower() == "a":
        youtube_source.compulsory_tags += compulsory_tags
    elif ctag_action.lower() == "r":
        youtube_source.compulsory_tags = compulsory_tags
    link_store = LinkStore(link=channel_id, assumed_tags=tag_formatter(assumed_tags), content_type=ContentType.Video, is_multiple=True, compulsory_tags=compulsory_tags)
    youtube_source._links.append(link_store.to_dict())
    youtube_source.compulsory_tags = list(set(youtube_source.compulsory_tags))
    youtube_source.save()
    return youtube_source







In [118]:
category = "celebirity_fashion__contest celebrity__gossips"
# c_id = "UCg4_1iEPv3IlgTvJh2IZAjQ"
link = f"UCkxP6nWL35Yq6QEjiwWBITw"

In [119]:
create_youtube_source(link=link, category=category)

Enter compulsory tags 
Press a for append, r for replace and l for leave, for action on compulsory_tagsa


<SourceMap: SourceMap object>